Spark SQL examplesΒΆ

# -*- coding: utf-8 -*-
import os

os.chdir("/home/cloudops/spark")
os.curdir

# Create a SQL Context from Spark Context
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

#============================
# Working with Data Frames
#============================

# Create a Data Frame from a JSON file
# Example:
# { "name": "Benjamin Garrison",
#   "gender": "male",
#   "deptid": "100",
#   "age": "32",
#   "salary": "3000" }
empDf = sqlContext.read.json("data/customerData.json")

empDf.show()
# +---+------+------+-----------------+------+
# |age|deptId|gender|             name|salary|
# +---+------+------+-----------------+------+
# | 32|   100|  male|Benjamin Garrison|  3000|
# | 40|   200|  male|    Holland Drake|  4500|
# | 26|   100|  male|  Burks Velasquez|  2700|
# | 51|   100|female|    June Rutledge|  4300|
# | 44|   200|  male|    Nielsen Knapp|  6500|
# +---+------+------+-----------------+------+

empDf.printSchema()
# root
# |-- age: string (nullable = true)
# |-- deptId: string (nullable = true)
# |-- gender: string (nullable = true)
# |-- name: string (nullable = true)
# |-- salary: string (nullable = true)

# Do SQL queries
empDf.select("name", "salary").show()
# +-----------------+------+
# |             name|salary|
# +-----------------+------+
# |Benjamin Garrison|  3000|
# . . .

empDf.filter(empDf["age"] == 40).show()
# +---+------+------+-------------+------+
# |age|deptId|gender|         name|salary|
# +---+------+------+-------------+------+
# | 40|   200|  male|Holland Drake|  4500|
# +---+------+------+-------------+------+

empDf.groupBy("gender").count().show()
# +------+-----+
# |gender|count|
# +------+-----+
# |female|    1|
# |  male|    4|
# +------+-----+

empDf.groupBy("deptId").\
    agg({"salary": "avg", "age": "max"}).show()
# +------+------------------+--------+
# |deptId|       avg(salary)|max(age)|
# +------+------------------+--------+
# |   200|            5500.0|      44|
# |   100|3333.3333333333335|      51|
# +------+------------------+--------+

# =====================================
# Create a Data Frame from a List
# =====================================
# NOTES:
# 1. The field could not be deptId (see JOIN below)
# 2. The Upper/Lower case is ignoreg
deptList = [{'deptName': 'Sales', 'id': "100"},\
            {'deptName':'Engineering','id':"200" } \
            ]
deptDf = sqlContext.createDataFrame(deptList)
deptDf.show()
# +---+-----------+
# | id|       name|
# +---+-----------+
# |100|      Sales|
# |200|Engineering|
# +---+-----------+

# =====================================
# JOIN the Data Frames
# =====================================
empDf.join(deptDf, empDf.deptId == deptDf.id).show()

# =====================================
# Cascading operations
# =====================================
# ERROR: Reference 'deptId' is ambiguous
empDf.filter(empDf["age"] > 30).join(deptDf,\
        empDf.deptId == deptDf.id).\
        groupBy("deptId").\
        agg({"salary": "avg", "age": "max"}).show()
# +------+-----------+--------+
# |deptId|avg(salary)|max(age)|
# +------+-----------+--------+
# |   200|     5500.0|      44|
# |   100|     3650.0|      51|
# +------+-----------+--------+

# =====================================
# Register a Data Frame as table and
# run SQL statements against it
# =====================================
empDf.registerTempTable("employees")
deptDf.registerTempTable("departments")

sqlContext.sql("SELECT * FROM employees WHERE salary > 4000").show()
sqlContext.sql("SELECT * FROM departments").show()
# +---+------+------+-------------+------+
# |age|deptId|gender|         name|salary|
# +---+------+------+-------------+------+
# | 40|   200|  male|Holland Drake|  4500|
# | 51|   100|female|June Rutledge|  4300|
# | 44|   200|  male|Nielsen Knapp|  6500|
# +---+------+------+-------------+------+

sqlContext.sql("SELECT e.deptId, \
                       AVG(e.salary) AS avgSalary, \
                       MAX(e.age) AS maxAge \
                  FROM employees e \
                    JOIN departments d \
                      ON e.deptId == d.id \
                  WHERE e.age > 30 \
                  GROUP BY e.deptId").show()
# +------+---------+------+
# |deptId|avgSalary|maxAge|
# +------+---------+------+
# |   200|   5500.0|    44|
# |   100|   3650.0|    51|
# +------+---------+------+

# =====================================
# Pandas (Data Frame)
# =====================================
empPands = empDf.toPandas()
# Pandas >= 0.19.2 must be installed
for index, row in empPands.iterrows():
    print(row["salary"])

# =====================================
# Database (JDBC -> MySQL)
# =====================================
# Make sure that the spark classpaths are set
# appropriately in the spark-defaults.conf file
# to include the driver files
# NOTES:
# 1. dbtable = "demotable" could be SQL statement
# 2. DB Driver: JAVA file in //python/lib directory
# mysql -u admin -D wp -p
# > show tables;
# > select * from wp_terms;
# wpTermsDf = SparkSession.read.format("jdbc").options(
wpTermsDf = sqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/wp",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "wp_terms",
    user="admin",
    password="blueC0aT").load()

wpTermsDf.show()
# +-------+--------------------+--------------------+----------+
# |term_id|                name|                slug|term_group|
# +-------+--------------------+--------------------+----------+
# |      1|       Uncategorized|       uncategorized|         0|
# |      2|              simple|              simple|         0|
# |      3|             grouped|             grouped|         0|
# |      4|            variable|            variable|         0|
# . . .

# =====================================
# Creating data frames from RDD
# =====================================

from pyspark.sql import Row
lines = sc.textFile("auto-data.csv")

# Remove the first line
dataLines = lines.filter(lambda x: "FUELTYPE" not in x)
dataLines.count()

parts = dataLines.map(lambda l: l.split(","))

autoMap = parts.map(lambda p: \
                    Row(make=p[0], body=p[4], hp=int(p[7])))
autoMap.collect()
# [Row(body='hatchback', hp=69, make='subaru'),
#  Row(body='hatchback', hp=48, make='chevrolet'),
#  Row(body='hatchback', hp=68, make='mazda'),
#  Row(body='hatchback', hp=62, make='toyota'),
# . . .

# Infer the schema, and register the DataFrame as a table
autoDf = sqlContext.createDataFrame(autoMap)

autoDf.registerTempTable("autos")

sqlContext.sql("SELECT * FROM autos WHERE hp > 200").show()
# +-----------+---+-------+
# |       body| hp|   make|
# +-----------+---+-------+
# |    hardtop|207|porsche|
# |    hardtop|207|porsche|
# |      sedan|262| jaguar|
# |convertible|207|porsche|
# +-----------+---+-------+